//package com.zj.learn.flux;
//
//import java.util.concurrent.Flow;
//import java.util.concurrent.SubmissionPublisher;
//
///**
// * @author xi.yang
// * @create 2019-02-13 10:15
// **/
//public class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {
//    private Flow.Subscription subscription;
//    @Override
//    public void onSubscribe(Flow.Subscription subscription) {
//        this.subscription = subscription;
//        this.subscription.request(1);
//    }
//
//    @Override
//    public void onNext(Integer item) {
//        System.out.println("处理器接收数据：" + item);
//        if (item > 0) {
//            this.submit("转换后的数据：" + item);
//        }
//        this.subscription.request(1);
//    }
//
//    @Override
//    public void onError(Throwable throwable) {
//        throwable.printStackTrace();
//        this.subscription.cancel();
//    }
//
//    @Override
//    public void onComplete() {
//        System.out.println("处理完成");
//        this.close();
//    }
//
//    public static void main(String[] args) {
//        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
//        MyProcessor myProcessor = new MyProcessor();
//        publisher.subscribe(myProcessor);
//
//        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
//            @Override
//            public void onSubscribe(Flow.Subscription subscription) {
//                this.subscription = subscription;
//                this.subscription.request(1);
//            }
//
//            @Override
//            public void onNext(String item) {
//                System.out.println("接收到数据： " + item);
//                this.subscription.request(1);
//            }
//
//            @Override
//            public void onError(Throwable throwable) {
//                throwable.printStackTrace();
//                this.subscription.cancel();
//            }
//
//            @Override
//            public void onComplete() {
//                System.out.println("处理完了");
//            }
//
//            private Flow.Subscription subscription;
//        };
//
//        myProcessor.subscribe(subscriber);
//
//        publisher.submit(-111);
//        publisher.submit(111);
//        publisher.submit(222);
//        publisher.submit(333);
//        publisher.submit(444);
//
//        publisher.close();
//        try {
//            Thread.currentThread().join(5000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
//    }
//}
